home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / tcop / slaves.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  28.1 KB  |  1,050 lines

  1. /* ----------------------------------------------------------------
  2.  *   FILE
  3.  *    slaves.c
  4.  *    
  5.  *   DESCRIPTION
  6.  *    slave backend management routines
  7.  *
  8.  *   INTERFACE ROUTINES
  9.  *    SlaveMain()
  10.  *    SlaveBackendsInit()
  11.  *    SlaveBackendsAbort()
  12.  *
  13.  *   NOTES
  14.  *
  15.  *   IDENTIFICATION
  16.  *    $Header: /private/postgres/src/tcop/RCS/slaves.c,v 1.24 1992/07/06 05:03:08 mao Exp $
  17.  * ----------------------------------------------------------------
  18.  */
  19. #include <signal.h>
  20. #include <setjmp.h>
  21.  
  22. #include "tmp/postgres.h"
  23.  
  24.  RcsId("$Header: /private/postgres/src/tcop/RCS/slaves.c,v 1.24 1992/07/06 05:03:08 mao Exp $");
  25.  
  26. /* ----------------
  27.  *    FILE INCLUDE ORDER GUIDELINES
  28.  *
  29.  *    1) tcopdebug.h
  30.  *    2) various support files ("everything else")
  31.  *    3) node files
  32.  *    4) catalog/ files
  33.  *    5) execdefs.h and execmisc.h, if necessary.
  34.  *    6) extern files come last.
  35.  * ----------------
  36.  */
  37. #include "tcop/tcopdebug.h"
  38. #include "nodes/plannodes.h"
  39. #include "nodes/plannodes.a.h"
  40. #include "nodes/execnodes.h"
  41. #include "executor/execdesc.h"
  42. #include "tcop/dest.h"
  43. #include "tcop/pquery.h"
  44. #include "tcop/slaves.h"
  45.  
  46. #include "access/xact.h"
  47. #include "utils/log.h"
  48. #include "catalog/syscache.h"
  49.  
  50. /* ----------------
  51.  *    parallel state variables
  52.  * ----------------
  53.  */
  54. /*
  55.  *    local data structures
  56.  */
  57. extern int MyPid; /* int representing the process id, defined in execipc.c */
  58. static ProcessNode *SlaveArray, *FreeSlaveP;
  59. int NumberOfFreeSlaves;
  60. ProcGroupLocalInfo    ProcGroupLocalInfoP; /* process group local info */
  61. static ProcGroupLocalInfo FreeProcGroupP;
  62. extern SlaveLocalInfoData SlaveLocalInfoD;  /* defined in execipc.c */
  63. extern int AdjustParallelismEnabled;
  64. FILE *StatFp;
  65. static bool RestartForParAdj = false;  /* indicating that the longjmp to
  66.                       SlaveRestartPoint is for paradj */
  67. static List QueryDesc;
  68.  
  69. /*
  70.  *    shared data structures
  71.  */
  72. int     *MasterProcessIdP;    /* master backend process id */
  73. int    *SlaveAbortFlagP;    /* flag set during a transaction abort */
  74. extern SlaveInfo    SlaveInfoP;    /* slave backend info */
  75. extern ProcGroupInfo  ProcGroupInfoP; /* process group info */
  76.                                       /* defined in execipc.c to make
  77.                                          postmaster happy */
  78.  
  79. TransactionState SharedTransactionState; /* current transaction info */
  80.  
  81. #define CONDITION_NORMAL    0
  82. #define CONDITION_ABORT        1
  83.  
  84. /* --------------------------------
  85.  *    SendAbortSignals
  86.  *
  87.  *    This sends a SIGHUP to every other backend in order
  88.  *    to cause them to preform their abort processing when
  89.  *    we discover a reason to abort the current transaction.
  90.  * --------------------------------
  91.  */
  92. void
  93. SendAbortSignals()    
  94. {
  95.     int nslaves;        /* number of slaves */
  96.     int i;            /* counter */
  97.     int p;            /* process id */
  98.  
  99. #ifdef TCOP_SLAVESYNCDEBUG    
  100.     if (IsMaster)
  101.     elog(DEBUG, "Master Backend sending abort signals");
  102.     else
  103.     elog(DEBUG, "Slave Backend %d sending abort signals", MyPid);
  104. #endif TCOP_SLAVESYNCDEBUG    
  105.  
  106.     nslaves = GetNumberSlaveBackends();
  107.     for (i=0; i<nslaves; i++)
  108.     if (i != MyPid) {
  109.         p = SlaveInfoP[i].unixPid;
  110.         if (kill(p, SIGHUP) != 0) {
  111.         fprintf(stderr, "signaling slave %d (pid %d): ", i, p);
  112.         perror("kill");
  113.         }
  114.     }
  115.     
  116.     if (! IsMaster) {
  117.     p = (*MasterProcessIdP);
  118.     if (kill(p, SIGHUP) != 0) {
  119.         fprintf(stderr, "signaling master (pid %d): ", p);
  120.         perror("kill");
  121.     }
  122.     }
  123. }
  124.  
  125. /* --------------------------------
  126.  *    SlaveRestart
  127.  *
  128.  *    This is the signal / exception handler for the slave
  129.  *    backends.  It causes processing to resume at the top
  130.  *    of SlaveMain(), right before SlaveBackendsAbort().
  131.  * --------------------------------
  132.  */
  133. jmp_buf    SlaveRestartPoint;
  134. int    SlaveWarnings = 0;
  135.  
  136. void
  137. SlaveRestart()
  138. {
  139.     SLAVE1_elog(DEBUG, "Slave Backend %d *** SlaveRestart ***", MyPid);
  140.  
  141.     longjmp(SlaveRestartPoint, 1);
  142. }
  143.  
  144. /* --------------------------------
  145.  *    SlaveBackendsAbort
  146.  *
  147.  *    this is called in each process when trouble arises.
  148.  * --------------------------------
  149.  */
  150. void
  151. SlaveBackendsAbort()
  152. {
  153.     int nslaves;        /* number of slaves */
  154.     int i;            /* counter */
  155.     
  156.     /* ----------------
  157.      *    if the abort flag is not set, then it means the problem
  158.      *  occurred within our process.  So set the flag and tell
  159.      *  all the other backends to abort.
  160.      *
  161.      *  only the aborting process does this:
  162.      * ----------------
  163.      */
  164.     if ((*SlaveAbortFlagP) != CONDITION_ABORT) {
  165.     (*SlaveAbortFlagP) = CONDITION_ABORT;
  166.     
  167.     SendAbortSignals();
  168.     }
  169.  
  170.     /* ----------------
  171.      *    all processes do this:
  172.      * ----------------
  173.      */
  174.     if (IsMaster) {
  175.     /* ----------------
  176.      *  in the master:
  177.      *
  178.      *    + we record the aborted transaction,
  179.      *      + reinitialize the synchronization semaphores
  180.      *      + reinitialize the executor shared memory
  181.      *      + signal the slave backends to cleanup.
  182.      *
  183.      *  We then wait for all the slaves to finish doing their
  184.      *  cleanup and then clear the abort flag.
  185.      * ----------------
  186.      */
  187.     SLAVE_elog(DEBUG, "Master Backend aborting current transaction");
  188.     AbortCurrentTransaction();
  189.  
  190.     SLAVE_elog(DEBUG, "Master Backend reinitializing semaphores");
  191.     nslaves = GetNumberSlaveBackends();
  192.     for (i=0; i<nslaves; i++) {
  193.         I_Start(i);
  194.     }
  195.     I_Finished();
  196.     
  197.     SLAVE_elog(DEBUG, "Master Backend reinitializing shared memory");
  198.     ExecSMInit();
  199.     
  200.     SLAVE_elog(DEBUG, "Master Backend signaling slaves abort");
  201.     V_Abort();
  202.  
  203.     SLAVE_elog(DEBUG, "Master Backend waiting for slave aborts");
  204.     P_FinishedAbort();
  205.  
  206.     SLAVE_elog(DEBUG, "Master Backend reinitializing abort semaphore");
  207.     I_Abort();
  208.  
  209.     SLAVE_elog(DEBUG, "Master Backend abort processing finished");
  210.     (*SlaveAbortFlagP) = CONDITION_NORMAL;
  211.  
  212.     } else {
  213.     /* ----------------
  214.      *  the slave backends preform their cleanup processing
  215.      *  after the master backend records the abort.  This is
  216.      *  guaranteed because the slaves P on the semaphore which
  217.      *  isn't V'ed by the master until after its done aborting.
  218.      * ----------------
  219.      */
  220.     SLAVE1_elog(DEBUG, "Slave Backend %d waiting to abort..",
  221.             MyPid);
  222.     P_Abort();
  223.  
  224.     SLAVE1_elog(DEBUG, "Slave Backend %d processing abort..",
  225.             MyPid);
  226.     SlaveAbortTransaction();
  227.  
  228.     SLAVE1_elog(DEBUG, "Slave Backend %d abort finished, signaling..",
  229.             MyPid);
  230.     
  231.     V_Abort();
  232.     V_FinishedAbort();
  233.     }
  234. }
  235.  
  236. /* --------------------------------
  237.  *    SlaveMain is the main loop executed by the slave backends
  238.  * --------------------------------
  239.  */
  240. void
  241. SlaveMain()
  242. {
  243.     int i;
  244.     int groupid;
  245.     extern int ShowExecutorStats;
  246.  
  247.     /* -----------------
  248.      * set the flag to false in case the SIGPARADJ is received
  249.      * -----------------
  250.      */
  251.     SlaveLocalInfoD.isworking = false;
  252.     /* ------------------
  253.      * set stat file pointer if required
  254.      * ------------------
  255.      */
  256.     if (ShowExecutorStats) {
  257.     char fname[30];
  258.     sprintf(fname, "/usr/tmp/hong/slave%d.stat", MyPid);
  259.         StatFp = fopen(fname, "w");
  260.       }
  261.     /* ----------------
  262.      *  before we begin processing we have to register a SIGHUP
  263.      *  handler so that if a problem occurs in one slave backend,
  264.      *  all the backends resynchronize.
  265.      * ----------------
  266.      */
  267.     if (setjmp(SlaveRestartPoint) != 0) {
  268.     if (RestartForParAdj) {
  269.         /* restart for parallelism adjustment */
  270.         RestartForParAdj = false;
  271.       }
  272.     else {
  273.         /* restart for transaction abort */
  274.         SlaveWarnings++;
  275.         SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort()",
  276.                 MyPid);
  277.     
  278.         SlaveBackendsAbort();
  279.         SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort() done",
  280.                 MyPid);
  281.       }
  282.       }    
  283.  
  284.     signal(SIGHUP, SlaveRestart);
  285.  
  286.     /* ----------------
  287.      *    POSTGRES slave processing loop begins here
  288.      * ----------------
  289.      */
  290.     SLAVE1_elog(DEBUG, "Slave Backend %d entering slave loop...", MyPid);
  291.     
  292.     for(;;) {
  293.     /* ----------------
  294.      *  setup caches, lock tables and memory stuff just
  295.      *  as if we were running within a transaction.
  296.      * ----------------
  297.      */
  298.     SlaveStartTransaction();
  299.     
  300.     /* ----------------
  301.      *  The master V's on the slave start semaphores after
  302.      *  placing the plan fragment in shared memory.
  303.      *  Meanwhile each slave waits on its start semaphore
  304.      *  until signaled by the master.
  305.      * ----------------
  306.      */
  307.     SLAVE1_elog(DEBUG, "Slave Backend %d waiting...", MyPid);
  308.     
  309.     P_Start(MyPid);
  310.     
  311.     SLAVE1_elog(DEBUG, "Slave Backend %d starting task.", MyPid);
  312.  
  313.     if (ShowExecutorStats)
  314.         ResetUsage();
  315.     /* ------------------
  316.      * initialize slave local info
  317.      * ------------------
  318.      */
  319.     groupid = SlaveInfoP[MyPid].groupId;
  320.     SlaveLocalInfoD.startpage = SlaveInfoP[MyPid].groupPid +
  321.                     (SlaveInfoP[MyPid].isAddOnSlave ? 
  322.                      ProcGroupInfoP[groupid].paradjpage : 0);
  323.     SlaveLocalInfoD.nparallel = ProcGroupInfoP[groupid].nprocess;
  324.     SlaveLocalInfoD.paradjpending = false;
  325.     SlaveLocalInfoD.paradjpage = -1;
  326.     SlaveLocalInfoD.newparallel = -1;
  327.     SlaveLocalInfoD.heapscandesc = NULL;
  328.     SlaveLocalInfoD.indexscandesc = NULL;
  329.     SlaveLocalInfoD.isworking = true;
  330.  
  331.     /* ----------------
  332.      *  get the query descriptor to execute.
  333.      * ----------------
  334.      */
  335.     QueryDesc = (List)CopyObject((List)ProcGroupInfoP[groupid].queryDesc);
  336.  
  337.     /* ----------------
  338.      *  process the query descriptor
  339.      * ----------------
  340.      */
  341.     if (QueryDesc != NULL)
  342.         ProcessQueryDesc(QueryDesc);
  343.  
  344.     /* ---------------
  345.      * it is important to set isDone to true before
  346.      * SlaveCommitTransaction(), because it may
  347.      * free the memory of SlaveInfoP[MyPid].heapscandesc
  348.      * ---------------
  349.      */
  350.     SlaveInfoP[MyPid].isDone = true;
  351.  
  352.     if (ShowExecutorStats)
  353.         ShowUsage();
  354.     /* ----------------
  355.      *  clean caches, lock tables and memory stuff just
  356.      *  as if we were running within a transaction.
  357.      * ----------------
  358.      */
  359.     SlaveCommitTransaction();
  360.         
  361.     /* ----------------
  362.      *  when the slave finishes, it signals the master
  363.      *  backend by V'ing on the master's finished semaphore.
  364.      *
  365.      *  Since the master started by placing nslaves locks
  366.      *  on the semaphore, the semaphore will be decremented
  367.      *  each time a slave finishes.  the last slave to finish
  368.      *  should bring the finished count to 1;
  369.      * ----------------
  370.      */
  371.     SLAVE1_elog(DEBUG, "Slave Backend %d task complete.", MyPid);
  372.     SlaveLocalInfoD.isworking = false;
  373.     V_Finished(groupid, &(ProcGroupInfoP[groupid].scounter), FINISHED);
  374.     }
  375. }
  376.  
  377. /* --------------------------------
  378.  *    MoveTransactionState copies the transaction system's
  379.  *    CurrentTransactionState information into permanent shared
  380.  *    memory and then sets the transaction system's state
  381.  *    pointer to the shared memory copy.  This is necessary to
  382.  *    have the transaction system use the shared state in each
  383.  *    of the parallel backends.
  384.  * --------------------------------
  385.  */
  386. void
  387. MoveTransactionState()
  388. {
  389.     int nbytes = sizeof(TransactionStateData);
  390.     extern TransactionState CurrentTransactionState;
  391.     
  392.     bcopy(CurrentTransactionState, SharedTransactionState, nbytes);
  393.     CurrentTransactionState = SharedTransactionState;
  394. }
  395.     
  396. /* --------------------------------
  397.  *    SlaveBackendsInit
  398.  *
  399.  *    SlaveBackendsInit initializes the communication structures,
  400.  *    forks several "worker" backends and returns.
  401.  *
  402.  *    Note: this function only returns in the master
  403.  *          backend.  The worker backends run forever in
  404.  *          a separate execution loop.
  405.  * --------------------------------
  406.  */
  407.  
  408. void
  409. SlaveBackendsInit()
  410. {
  411.     int nslaves;        /* number of slaves */
  412.     int i;            /* counter */
  413.     int p;            /* process id returned by fork() */
  414.     extern int ProcessAffinityOn; /* process affinity on flag */
  415.     
  416.     /* ----------------
  417.      *  first initialize shared memory and get the number of
  418.      *  slave backends.
  419.      * ----------------
  420.      */
  421.     nslaves = GetNumberSlaveBackends();
  422.  
  423.     /* -----------------
  424.      * the following calls to SearchSysCacheTuple() are total hacks
  425.      * what they do is to pre-initialize all the caches so that
  426.      * performance numbers will look better.  they can be removed at
  427.      * any time.
  428.      * -----------------
  429.      */
  430.     SearchSysCacheTuple(AMOPOPID, NULL, NULL, NULL, NULL);
  431.     SearchSysCacheTuple(AMOPSTRATEGY, NULL, NULL, NULL, NULL);
  432.     SearchSysCacheTuple(ATTNAME, "", NULL, NULL, NULL);
  433.     SearchSysCacheTuple(ATTNUM, NULL, NULL, NULL, NULL);
  434.     SearchSysCacheTuple(INDEXRELID, NULL, NULL, NULL, NULL);
  435.     SearchSysCacheTuple(LANNAME, "", NULL, NULL, NULL);
  436.     SearchSysCacheTuple(OPRNAME, "", NULL, NULL, NULL);
  437.     SearchSysCacheTuple(OPROID, NULL, NULL, NULL, NULL);
  438.     SearchSysCacheTuple(PRONAME, "", NULL, NULL, NULL);
  439.     SearchSysCacheTuple(PROOID, NULL, NULL, NULL, NULL);
  440.     SearchSysCacheTuple(RELNAME, "", NULL, NULL, NULL);
  441.     SearchSysCacheTuple(RELOID, NULL, NULL, NULL, NULL);
  442.     SearchSysCacheTuple(TYPNAME, "", NULL, NULL, NULL);
  443.     SearchSysCacheTuple(TYPOID, NULL, NULL, NULL, NULL);
  444.     SearchSysCacheTuple(AMNAME, "", NULL, NULL, NULL);
  445.     SearchSysCacheTuple(CLANAME, "", NULL, NULL, NULL);
  446.     SearchSysCacheTuple(INDRELIDKEY, NULL, NULL, NULL, NULL);
  447.     SearchSysCacheTuple(INHRELID, NULL, NULL, NULL, NULL);
  448.     SearchSysCacheTuple(PRS2PLANCODE, NULL, NULL, NULL, NULL);
  449.     SearchSysCacheTuple(RULOID, NULL, NULL, NULL, NULL);
  450.     SearchSysCacheTuple(PRS2STUB, NULL, NULL, NULL, NULL);
  451.     
  452.     /* --------------------
  453.      * set signal for dynamically adjusting degrees of parallelism
  454.      * --------------------
  455.      */
  456.     if (AdjustParallelismEnabled)
  457.         signal(SIGPARADJ, paradj_handler);
  458.  
  459.     /* ----------------
  460.      *    initialize Start, Finished, and Abort semaphores
  461.      * ----------------
  462.      */
  463.     for (i=0; i<nslaves; i++)
  464.     I_Start(i);
  465.     I_Finished();
  466.     I_Abort();
  467.     
  468.     /* ----------------
  469.      *    allocate area in shared memory for process ids and
  470.      *  communication mechanisms.  All backends share these pointers.
  471.      * ----------------
  472.      */
  473.     MasterProcessIdP = (int*)ExecSMReserve(sizeof(int));
  474.     SlaveAbortFlagP = (int*)ExecSMReserve(sizeof(int));
  475.     SlaveInfoP = (SlaveInfo)ExecSMReserve(nslaves * sizeof(SlaveInfoData));
  476.     ProcGroupInfoP = (ProcGroupInfo)ExecSMReserve(nslaves *
  477.                             sizeof(ProcGroupInfoData));
  478.     SharedTransactionState = (TransactionState)ExecSMReserve(
  479.                           sizeof(TransactionStateData));
  480.  
  481.     /* ----------------
  482.      *    move the transaction system state data into shared memory
  483.      *  before we fork so that all backends share the transaction state.
  484.      * ----------------
  485.      */
  486.     MoveTransactionState();
  487.     
  488.     /* ----------------
  489.      *    initialize executor shared memory
  490.      * ----------------
  491.      */
  492.     ExecSMInit();
  493.     /* ----------------
  494.      *    initialize shared memory variables
  495.      * ----------------
  496.      */
  497.     (*MasterProcessIdP) = getpid();
  498.     (*SlaveAbortFlagP) = CONDITION_NORMAL;
  499.     /* ----------------
  500.      *    fork several slave processes and save the process id's
  501.      * ----------------
  502.      */
  503.     MyPid = -1;
  504.     
  505.     for (i=0; i<nslaves; i++)
  506.     if (IsMaster) {
  507.         if ((p = fork()) != 0) {
  508.         if (p < 0) {
  509.             perror("fork");
  510.             exitpg(1);
  511.           }
  512.         /* initialize shared data structures */
  513.         SlaveInfoP[i].unixPid = p;
  514.         SlaveInfoP[i].groupId = -1;
  515.         SlaveInfoP[i].groupPid = -1;
  516.         SlaveInfoP[i].resultTmpRelDesc = NULL;
  517. #ifdef sequent
  518.         S_INIT_LOCK(&(SlaveInfoP[i].lock));
  519.         S_LOCK(&(SlaveInfoP[i].lock));
  520. #endif
  521.         ProcGroupInfoP[i].status = IDLE;
  522.         ProcGroupInfoP[i].queryDesc = NULL;
  523.         ProcGroupInfoP[i].scounter.count = 0;
  524.         ProcGroupInfoP[i].dropoutcounter.count = 0;
  525. #ifdef sequent
  526.         S_INIT_LOCK(&(ProcGroupInfoP[i].scounter.exlock));
  527.         S_INIT_LOCK(&(ProcGroupInfoP[i].dropoutcounter.exlock));
  528. #endif
  529.         ProcGroupInfoP[i].paradjpage = NULLPAGE;
  530.         InitMWaitOneLock(&(ProcGroupInfoP[i].m1lock));
  531.         } else {
  532.         MyPid = i;
  533. #ifdef sequent
  534.         if (ProcessAffinityOn && GetNumberSlaveBackends() <= 12) {
  535.             int prev;
  536.             /* ----------------------
  537.              * bind slave to a specific processor
  538.              * ----------------------
  539.              */
  540.             prev = tmp_affinity(MyPid);
  541.             if (prev == -1) {
  542.             fprintf(stderr, "slave %d ", MyPid);
  543.             perror("tmp_affinity");
  544.               }
  545.           }
  546. #endif
  547.         SLAVE1_elog(DEBUG, "Slave Backend %d alive!", MyPid);
  548.         }
  549.     }
  550.     
  551.     /* ----------------
  552.      *    now ExecIsMaster is true in the master backend and is false
  553.      *  in each of the slaves.  In addition, each slave is branded
  554.      *  with a slave id to identify it.  So, if we're a slave, we now
  555.      *  get sent off to the labor camp, never to return..
  556.      * ----------------
  557.      */
  558.     if (IsMaster) {
  559.     /* initialize local data structures of the master */
  560.     SlaveArray = (ProcessNode*)malloc(nslaves * sizeof(ProcessNode));
  561.     FreeSlaveP = SlaveArray;
  562.     NumberOfFreeSlaves = nslaves;
  563.     for (i=0; i<nslaves; i++) {
  564.         SlaveArray[i].pid = i;
  565.         SlaveArray[i].next = SlaveArray + i + 1;
  566.       }
  567.     SlaveArray[nslaves-1].next = NULL;
  568.     ProcGroupLocalInfoP = (ProcGroupLocalInfo)malloc(nslaves *
  569.                         sizeof(ProcGroupLocalInfoData));
  570.     FreeProcGroupP = ProcGroupLocalInfoP;
  571.     for (i=0; i<nslaves; i++) {
  572.         ProcGroupLocalInfoP[i].id = i;
  573.         ProcGroupLocalInfoP[i].fragment = NULL;
  574.         ProcGroupLocalInfoP[i].memberProc = NULL;
  575.         ProcGroupLocalInfoP[i].nextfree = ProcGroupLocalInfoP + i + 1;
  576.         ProcGroupLocalInfoP[i].resultTmpRelDescList = LispNil;
  577.       }
  578.     ProcGroupLocalInfoP[nslaves - 1].nextfree = NULL;
  579.       }
  580.     else {
  581.     SlaveMain();
  582.       }
  583.  
  584.     /* ----------------
  585.      *    here we're in the master and we've completed initialization
  586.      *  so we return to the read..parse..plan..execute loop.
  587.      * ----------------
  588.      */
  589.     return;
  590. }
  591.  
  592. /* ----------------------
  593.  *    getFreeSlave
  594.  *
  595.  *    get a free slave backend from the FreeSlaveP queue
  596.  *    also decrements NumberOfFreeSlaves
  597.  * ----------------------
  598.  */
  599. int
  600. getFreeSlave()
  601. {
  602.     ProcessNode *p;
  603.  
  604.     p = FreeSlaveP;
  605.     FreeSlaveP = p->next;
  606.     NumberOfFreeSlaves--;
  607.  
  608.     return p->pid;
  609. }
  610.  
  611. /* ------------------------
  612.  *    freeSlave
  613.  *
  614.  *    frees a slave to FreeSlaveP queue
  615.  *    increments NumberOfFreeSlaves
  616.  * ------------------------
  617.  */
  618. void
  619. freeSlave(i)
  620. int i;
  621. {
  622.     SlaveArray[i].next = FreeSlaveP;
  623.     FreeSlaveP = SlaveArray + i;
  624.     SlaveInfoP[i].groupId = -1;
  625.     SlaveInfoP[i].groupPid = -1;
  626.     NumberOfFreeSlaves++;
  627. }
  628.  
  629. /* ------------------------
  630.  *    getFreeProcGroup
  631.  *
  632.  *    get a free process group with nproc free slave processes
  633.  * ------------------------
  634.  */
  635. int
  636. getFreeProcGroup(nproc)
  637. int nproc;
  638. {
  639.     ProcGroupLocalInfo p;
  640.     ProcessNode *slavep;
  641.     int i;
  642.     int pid;
  643.  
  644.     p = FreeProcGroupP;
  645.     FreeProcGroupP = p->nextfree;
  646.     pid = getFreeSlave();
  647.     SlaveInfoP[pid].groupId = p->id;
  648.     SlaveInfoP[pid].groupPid = 0;
  649.     SlaveInfoP[pid].isAddOnSlave = false;
  650.     SlaveInfoP[pid].isDone = false;
  651.     p->memberProc = SlaveArray + pid;
  652.     slavep = p->memberProc;
  653.     for (i=1; i<nproc; i++) {
  654.     pid = getFreeSlave();
  655.     SlaveInfoP[pid].groupId = p->id;
  656.     SlaveInfoP[pid].groupPid = i;
  657.         SlaveInfoP[pid].isAddOnSlave = false;
  658.         SlaveInfoP[pid].isDone = false;
  659.     slavep->next = SlaveArray + pid;
  660.     slavep = slavep->next;
  661.       }
  662.     slavep->next = NULL;
  663.     return p->id;
  664. }
  665.  
  666. /* --------------------------
  667.  *    addSlaveToProcGroup
  668.  *
  669.  *    add a free slave to an existing process group
  670.  * --------------------------
  671.  */
  672. void
  673. addSlaveToProcGroup(slave, group, groupid)
  674. int slave;
  675. int group;
  676. int groupid;
  677. {
  678.     SlaveInfoP[slave].groupId = group;
  679.     SlaveInfoP[slave].groupPid = groupid;
  680.     SlaveInfoP[slave].isAddOnSlave = true;
  681.     SlaveArray[slave].next = ProcGroupLocalInfoP[group].memberProc;
  682.     ProcGroupLocalInfoP[group].memberProc = SlaveArray + slave;
  683. }
  684.  
  685. /* -------------------------
  686.  *    freeProcGroup
  687.  *
  688.  *    frees a process group and all the slaves in the group
  689.  * -------------------------
  690.  */
  691. void
  692. freeProcGroup(gid)
  693. int gid;
  694. {
  695.     ProcessNode *p, *nextp;
  696.  
  697.     p=ProcGroupLocalInfoP[gid].memberProc;
  698.     while (p != NULL) {
  699.     nextp = p->next;
  700.     freeSlave(p->pid);
  701.     p = nextp;
  702.       }
  703.     ProcGroupInfoP[gid].status = IDLE;
  704.     ProcGroupLocalInfoP[gid].fragment = NULL;
  705.     ProcGroupLocalInfoP[gid].nextfree = FreeProcGroupP;
  706.     ProcGroupLocalInfoP[gid].resultTmpRelDescList = LispNil;
  707.     FreeProcGroupP = ProcGroupLocalInfoP + gid;
  708. }
  709.  
  710. /* ----------------------------
  711.  *    getFinishedProcGroup
  712.  *
  713.  *    walks the array of processes group and find the first
  714.  *    process group with status = FINISHED or PARADJPENDING
  715.  * -----------------------------
  716.  */
  717. int
  718. getFinishedProcGroup()
  719. {
  720.     int i;
  721.  
  722.     for (i=0; i<GetNumberSlaveBackends(); i++) {
  723.     if (ProcGroupInfoP[i].status == FINISHED ||
  724.         ProcGroupInfoP[i].status == PARADJPENDING)
  725.         return i;
  726.       }
  727.     return -1;
  728. }
  729.  
  730. /* -------------------------------
  731.  *    wakeupProcGroup
  732.  *
  733.  *    wake up the processes in process group
  734.  * -------------------------------
  735.  */
  736. void
  737. wakeupProcGroup(groupid)
  738. int groupid;
  739. {
  740.     ProcessNode *p;
  741.  
  742.     for (p = ProcGroupLocalInfoP[groupid].memberProc;
  743.      p != NULL;
  744.      p = p->next) {
  745.       V_Start(p->pid);
  746.      }
  747. }
  748.  
  749. /* ---------------------------------
  750.  *    signalProcGroup
  751.  *
  752.  *    send a signal to a process group
  753.  * ---------------------------------
  754.  */
  755. void
  756. signalProcGroup(groupid, sig)
  757. int groupid;
  758. int sig;
  759. {
  760.     ProcessNode *p;
  761.  
  762.     for (p = ProcGroupLocalInfoP[groupid].memberProc;
  763.      p != NULL;
  764.      p = p->next) {
  765.     kill(SlaveInfoP[p->pid].unixPid, sig);
  766.       }
  767. }
  768.  
  769. /* ------------------------------
  770.  * the following routines are a specialized memory allocator for 
  771.  * the process groups.  they only supposed to be called by the master
  772.  * backend.  no mutex is done.
  773.  * ------------------------------
  774.  */
  775. static char *CurrentSMSegmentStart;
  776. static char *CurrentSMSegmentEnd;
  777. static int CurrentSMGroupid;
  778. static char *CurrentSMSegmentPointer;
  779.  
  780. /* --------------------------------
  781.  *    ProcGroupSMBeginAlloc
  782.  *
  783.  *    begins shared memory allocation for process group
  784.  * --------------------------------
  785.  */
  786. void
  787. ProcGroupSMBeginAlloc(groupid)
  788. int groupid;
  789. {
  790.     MemoryHeader mp;
  791.  
  792.     mp = ExecGetSMSegment();
  793.     ProcGroupLocalInfoP[groupid].groupSMQueue = mp;
  794.     mp->next = NULL;
  795.     CurrentSMSegmentStart = mp->beginaddr;
  796.     CurrentSMSegmentEnd =  CurrentSMSegmentStart + mp->size;
  797.     CurrentSMGroupid = groupid;
  798.     CurrentSMSegmentPointer = CurrentSMSegmentStart;
  799. }
  800.  
  801. /* -------------------------------
  802.  *    ProcGroupSMEndAlloc
  803.  *
  804.  *    ends shared memory allocation for process group
  805.  *    frees the leftover memory from current memory segment
  806.  * -------------------------------
  807.  */
  808. void
  809. ProcGroupSMEndAlloc()
  810. {
  811.     int usedsize;
  812.     MemoryHeader mp;
  813.  
  814.     usedsize = CurrentSMSegmentPointer - CurrentSMSegmentStart;
  815.     mp = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue;
  816.     CurrentSMSegmentStart=CurrentSMSegmentPointer=CurrentSMSegmentEnd = NULL;
  817.     ExecSMSegmentFreeUnused(mp, usedsize);
  818. }
  819.  
  820. /* --------------------------------
  821.  *    ProcGroupSMAlloc
  822.  *
  823.  *    allocate shared memory within a process group
  824.  *    if the current memory segment runs out, allocate a new segment
  825.  * ---------------------------------
  826.  */
  827. char *
  828. ProcGroupSMAlloc(size)
  829. int size;
  830. {
  831.     MemoryHeader mp;
  832.     char *retP;
  833.  
  834.     while (CurrentSMSegmentPointer + size > CurrentSMSegmentEnd) {
  835.     mp = ExecGetSMSegment();
  836.     if (mp == NULL)
  837.         elog(WARN, "out of executor shared memory, got to die.");
  838.     mp->next = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue;
  839.     ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue = mp;
  840.     CurrentSMSegmentStart = mp->beginaddr;
  841.     CurrentSMSegmentEnd =  CurrentSMSegmentStart + mp->size;
  842.     CurrentSMSegmentPointer = CurrentSMSegmentStart;
  843.       }
  844.     retP = CurrentSMSegmentPointer;
  845.     CurrentSMSegmentPointer = (char*)LONGALIGN(CurrentSMSegmentPointer + size);
  846.     return retP;
  847. }
  848.     
  849. /* -------------------------------
  850.  *    ProcGroupSMClean
  851.  *
  852.  *    frees the shared memory allocated for a process group
  853.  * -------------------------------
  854.  */
  855. void
  856. ProcGroupSMClean(groupid)
  857. int groupid;
  858. {
  859.     MemoryHeader mp, nextp;
  860.  
  861.     mp = ProcGroupLocalInfoP[groupid].groupSMQueue;
  862.     while (mp != NULL) {
  863.     nextp = mp->next;
  864.     ExecSMSegmentFree(mp);
  865.     mp = nextp;
  866.       }
  867. }
  868.  
  869. /* -----------------------------
  870.  * the following routines are special functions for copying reldescs to
  871.  * shared memory
  872.  * -----------------------------
  873.  */
  874. static char *SlaveTmpRelDescMemoryP;
  875.  
  876. /* -------------------------------
  877.  *    SlaveTmpRelDescInit
  878.  *
  879.  *    initialize shared memory preallocated for temporary relation descriptors
  880.  * -------------------------------
  881.  */
  882. void
  883. SlaveTmpRelDescInit()
  884. {
  885.     SlaveTmpRelDescMemoryP = (char*)SlaveInfoP[MyPid].resultTmpRelDesc;
  886. }
  887.  
  888. /* -------------------------------
  889.  *    SlaveTmpRelDescAlloc
  890.  *
  891.  *    memory allocation for reldesc copying
  892.  *    Note: there is no boundary checking, so had better pre-allocate
  893.  *    enough memory!
  894.  * -------------------------------
  895.  */
  896. char *
  897. SlaveTmpRelDescAlloc(size)
  898. int size;
  899. {
  900.     char *retP;
  901.  
  902.     retP = SlaveTmpRelDescMemoryP;
  903.     SlaveTmpRelDescMemoryP = (char*)LONGALIGN(SlaveTmpRelDescMemoryP + size);
  904.  
  905.     return retP;
  906. }
  907.  
  908. /* ---------------------------------
  909.  *    getProcGroupMaxPage
  910.  *
  911.  *    find out the largest page number the slaves are scanning
  912.  *    used only after SIGPARADJ signal has been sent to the
  913.  *    process group.
  914.  * ---------------------------------
  915.  */
  916. int
  917. getProcGroupMaxPage(groupid)
  918. int groupid;
  919. {
  920.     ProcessNode *p;
  921.     int maxpage = NULLPAGE;
  922.     int page;
  923.  
  924.     for (p = ProcGroupLocalInfoP[groupid].memberProc;
  925.      p != NULL;
  926.      p = p->next) {
  927. #ifdef HAS_TEST_AND_SET
  928.     S_LOCK(&(SlaveInfoP[p->pid].lock));
  929. #endif
  930.     page = SlaveInfoP[p->pid].curpage;
  931.     if (page == NOPARADJ)
  932.         maxpage = NOPARADJ;
  933.     if (maxpage < page && maxpage != NOPARADJ)
  934.         maxpage = page;
  935.       }
  936.     return maxpage;
  937. }
  938.  
  939. /* ---------------------------------------
  940.  *    paradj_handler
  941.  *
  942.  *    signal handler for dynamically adjusting degrees of parallelism
  943.  *    XXX only handle heap scan now.
  944.  * ---------------------------------------
  945.  */
  946. int
  947. paradj_handler()
  948. {
  949.     BlockNumber curpage;
  950.     HeapTuple curtuple;
  951.     ItemPointer tid;
  952.     int groupid;
  953.  
  954.     SLAVE1_elog(DEBUG, "slave %d got SIGPARADJ", MyPid);
  955.     if (SlaveInfoP[MyPid].isDone) {
  956.     /* -----------------------
  957.      * this means that the whole job is almost done
  958.      * no adjustment to parallelism should be made
  959.      * ------------------------
  960.      */
  961.     SlaveInfoP[MyPid].curpage = NOPARADJ;
  962.     curpage = NOPARADJ;
  963.       }
  964.     else
  965.     if (!SlaveLocalInfoD.isworking || SlaveLocalInfoD.heapscandesc == NULL) {
  966.     if (SlaveInfoP[MyPid].isAddOnSlave) {
  967.         SlaveInfoP[MyPid].curpage = SlaveLocalInfoD.startpage;
  968.         curpage = SlaveLocalInfoD.startpage;
  969.       }
  970.     else {
  971.         SlaveInfoP[MyPid].curpage = NULLPAGE;
  972.         curpage = NULLPAGE;
  973.       }
  974.       }
  975.     else {
  976.         curtuple = SlaveLocalInfoD.heapscandesc->rs_ctup;
  977.         tid = &(curtuple->t_ctid);
  978.     if (ItemPointerIsValid(tid)) {
  979.             curpage = ItemPointerGetBlockNumber(tid);
  980.       }
  981.     else {
  982.         curpage = NULLPAGE;
  983.       }
  984.         SlaveInfoP[MyPid].curpage = curpage;
  985.       }
  986. #ifdef HAS_TEST_AND_SET
  987.     S_UNLOCK(&(SlaveInfoP[MyPid].lock));
  988. #endif
  989.     SLAVE2_elog(DEBUG, "slave %d sending back curpage = %d", MyPid, curpage);
  990.     groupid = SlaveInfoP[MyPid].groupId;
  991.     MWaitOne(&(ProcGroupInfoP[groupid].m1lock));
  992.     SLAVE1_elog(DEBUG, "slave %d complete handshaking with master", MyPid);
  993.     if (ProcGroupInfoP[groupid].paradjpage == NOPARADJ) {
  994.     /* ----------------------
  995.      * this means that the master changed his/her mind
  996.      * no adjustment to parallelism will be done
  997.      * ----------------------
  998.      */
  999.     return;
  1000.       }
  1001.     SlaveLocalInfoD.paradjpending = true;
  1002.     SlaveLocalInfoD.paradjpage = ProcGroupInfoP[groupid].paradjpage;
  1003.     SlaveLocalInfoD.newparallel = ProcGroupInfoP[groupid].newparallel;
  1004.     return;
  1005. }
  1006.  
  1007. /* ------------------------------------
  1008.  *    paradj_nextpage
  1009.  *
  1010.  *    check if parallelism adjustment point is reached, if so
  1011.  *    figure out and return the next page to scan.
  1012.  *    XXX only works for heapscan right now.
  1013.  * -------------------------------------
  1014.  */
  1015. int
  1016. paradj_nextpage(page, dir)
  1017. int page;
  1018. int dir;
  1019. {
  1020.     if (SlaveLocalInfoD.paradjpending) {
  1021.         if (page >= SlaveLocalInfoD.paradjpage) {
  1022.             SLAVE2_elog(DEBUG, "slave %d adjusting page skip to %d",
  1023.                         MyPid, SlaveLocalInfoD.newparallel);
  1024.             if (SlaveLocalInfoD.newparallel >= SlaveLocalInfoD.nparallel ||
  1025.                 SlaveInfoP[MyPid].groupPid < SlaveLocalInfoD.newparallel) {
  1026.                 SlaveLocalInfoD.nparallel = SlaveLocalInfoD.newparallel;
  1027.                 SlaveLocalInfoD.paradjpending = false;
  1028.         if (dir < 0)
  1029.                     return 
  1030.             SlaveLocalInfoD.paradjpage-SlaveInfoP[MyPid].groupPid;
  1031.         else
  1032.                     return
  1033.             SlaveLocalInfoD.paradjpage+SlaveInfoP[MyPid].groupPid;
  1034.               }
  1035.             else {
  1036.                 int groupid = SlaveInfoP[MyPid].groupId;
  1037.         Plan plan = QdGetPlan(QueryDesc);
  1038.         EState estate = (EState)get_state(plan);
  1039.         EndPlan(plan, estate);
  1040.                 V_Finished(groupid, &(ProcGroupInfoP[groupid].dropoutcounter),
  1041.                            PARADJPENDING);
  1042.         RestartForParAdj = true;
  1043.         SlaveRestart();
  1044.               }
  1045.           }
  1046.       }
  1047.     else
  1048.     return NULLPAGE;
  1049. }
  1050.